def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = { debug("Getting broker partition info for topic %s".format(topic)) // check if the cache has metadata for this topic val topicMetadata = topicPartitionInfo.get(topic) val metadata: TopicMetadata = topicMetadata match { case Some(m) => m case None => // refresh the topic metadata cache updateInfo(Set(topic), correlationId) val topicMetadata = topicPartitionInfo.get(topic) topicMetadata match { case Some(m) => m case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic) } } val partitionMetadata = metadata.partitionsMetadata if(partitionMetadata.size == 0) { if(metadata.errorCode != ErrorMapping.NoError) { throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode)) } else { throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata)) } } partitionMetadata.map { m => m.leader match { case Some(leader) => debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id)) new PartitionAndLeader(topic, m.partitionId, Some(leader.id)) case None => debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId)) new PartitionAndLeader(topic, m.partitionId, None) } }.sortWith((s, t) => s.partitionId < t.partitionId) }